package com.atguigu.flink.state;

import com.alibaba.fastjson.JSON;
import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.UserBehavior;
import com.atguigu.flink.pojo.WaterSensor;
import com.atguigu.flink.utils.MyUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Created by Smexy on 2023/11/17
 *
 用户对某个商品的一次操作是一行
 userId,itemId,categoryId,behavior,ts(秒)
 543462,1715,1464116,pv,1511658000
----------------------------------------
 每间隔1h统计过去1h的Top3热门(点击量高)商品。
 ----------------------------

 输入:
      用户对某个商品的一次操作是一行

 聚合： 事件时间滚动窗口
            size: 1h
            silde : 1h
        1）先对每个商品按照pv进行过滤
        2）按照商品的itemId进行分组聚合，统计在当前窗口中，每一个商品被pv的次数
            [8:00,9:00)  : a-->200,b-->150,c--->100,d-->90,e--->80
                count操作，第一次聚合
        3) top3计算的前提是需要凑齐当前时间段的所有商品的点击数据。
             如果在窗口中进行top3，需要让所有商品的点击数据都汇总到一个窗口中
            这个窗口一定得是全局窗口
                topN操作，求前3，第二次聚合


        reduce(ReduceFunction x, AllWindowFunction y)
        aggregate(AggregateFunction x,AllWindowFunction y)


 输出:
      [8:00,9:00) top3: a-->200,b-->150,c--->100

 */
public class Demo2_TopNAllWindow
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                  new TextLineInputFormat(),
                                                  new Path("data/UserBehavior.csv"))
                                              .build();

         WatermarkStrategy<UserBehavior> watermarkStrategy = WatermarkStrategy
                     .<UserBehavior>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTs());

                env
                   .fromSource(source,WatermarkStrategy.noWatermarks(),"data")
                   .map(new MapFunction<String, UserBehavior>()
                   {
                       @Override
                       public UserBehavior map(String value) throws Exception {
                           String[] words = value.split(",");
                           return new UserBehavior(
                               words[0],
                               words[1],
                               words[2],
                               words[3],
                               Long.valueOf(words[4]) * 1000
                           );
                       }
                   })
                   //热门根据pv的次数，过滤pv的数据
                    .filter(u -> "pv".equals(u.getBehavior()))
                    .assignTimestampsAndWatermarks(watermarkStrategy)
                    .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
                    .aggregate(
                        //第一次聚合:   a-->200,b-->150,c--->100,d-->90,e--->80
                        new AggregateFunction<UserBehavior, Map<String, Integer>, Map<String, Integer>>()
                        {

                            @Override
                            public Map<String, Integer> createAccumulator() {
                                return new HashMap<>();
                            }

                            @Override
                            public Map<String, Integer> add(UserBehavior value, Map<String, Integer> acc) {
                                Integer lastPv = acc.getOrDefault(value.getItemId(), 0);
                                acc.put(value.getItemId(), lastPv + 1);
                                return acc;
                            }

                            @Override
                            public Map<String, Integer> getResult(Map<String, Integer> accumulator) {
                                return accumulator;
                            }

                            @Override
                            public Map<String, Integer> merge(Map<String, Integer> a, Map<String, Integer> b) {
                                return null;
                            }
                        },
                        new AllWindowFunction<Map<String, Integer>, String, TimeWindow>()
                        {
                            @Override
                            public void apply(TimeWindow window, Iterable<Map<String, Integer>> values, Collector<String> out) throws Exception {
                                Map<String, Integer> map = values.iterator().next();
                                //top3排序
                                List<Map.Entry<String, Integer>> top3 = map.entrySet()
                                                                              .stream()
                                                                              .sorted(new Comparator<Map.Entry<String, Integer>>()
                                                                              {
                                                                                  @Override
                                                                                  public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                                                                                      return -o1.getValue().compareTo(o2.getValue());
                                                                                  }
                                                                              })
                                                                              .limit(3)
                                                                              .collect(Collectors.toList());

                                //封装字符串  [8:00,9:00) top3: a-->200,b-->150,c--->100
                                String timeStr = MyUtil.parseTimeWindow(window);

                                String top3Str = top3
                                    .stream()
                                    .map(entry -> entry.getKey() + "-->" + entry.getValue())
                                    .collect(Collectors.joining(","));

                                out.collect(timeStr +" top3: " +top3Str);
                            }
                        })
                    .print();


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }
}
